/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.tomcat.util.net;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState;
import org.apache.tomcat.util.security.PrivilegedSetTccl;

import java.io.IOException;
import java.net.BindException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;

/**
 * Handle incoming TCP connections.
 * <p>
 * This class implement a simple server model: one listener thread accepts on a socket and
 * creates a new worker thread for each incoming connection.
 * <p>
 * More advanced Endpoints will reuse the threads, use queues, etc.
 *
 * @author James Duncan Davidson
 * @author Jason Hunter
 * @author James Todd
 * @author Costin Manolache
 * @author Gal Shachor
 * @author Yoav Shapira
 * @author Remy Maucherat
 */
public class JIoEndpoint extends AbstractEndpoint<Socket> {


	// -------------------------------------------------------------- Constants

	private static final Log log = LogFactory.getLog(JIoEndpoint.class);

	// ----------------------------------------------------------------- Fields

	/**
	 * Associated server socket.
	 */
	protected ServerSocket serverSocket = null;


	// ------------------------------------------------------------ Constructor
	/**
	 * Handling of accepted sockets.
	 */
	protected Handler handler = null;

	// ------------------------------------------------------------- Properties
	/**
	 * Server socket factory.
	 */
	protected ServerSocketFactory serverSocketFactory = null;
	protected ConcurrentLinkedQueue<SocketWrapper<Socket>> waitingRequests =
			new ConcurrentLinkedQueue<SocketWrapper<Socket>>();

	public JIoEndpoint() {
		// Set maxConnections to zero so we can tell if the user has specified
		// their own value on the connector when we reach bind()
		setMaxConnections(0);
		// Reduce the executor timeout for BIO as threads in keep-alive will not
		// terminate when the executor interrupts them.
		setExecutorTerminationTimeoutMillis(0);
	}

	public Handler getHandler() {
		return handler;
	}

	public void setHandler(Handler handler) {
		this.handler = handler;
	}

	public ServerSocketFactory getServerSocketFactory() {
		return serverSocketFactory;
	}

	public void setServerSocketFactory(ServerSocketFactory factory) {
		this.serverSocketFactory = factory;
	}

	/**
	 * Port in use.
	 */
	@Override
	public int getLocalPort() {
		ServerSocket s = serverSocket;
		if (s == null) {
			return -1;
		} else {
			return s.getLocalPort();
		}
	}

	/*
	 * Optional feature support.
	 */
	@Override
	public boolean getUseSendfile() {
		return false;
	} // Not supported

	@Override
	public boolean getUseComet() {
		return false;
	} // Not supported

	@Override
	public boolean getUseCometTimeout() {
		return false;
	} // Not supported

	@Override
	public boolean getDeferAccept() {
		return false;
	} // Not supported


	// ------------------------------------------------ Handler Inner Interface

	@Override
	public boolean getUsePolling() {
		return false;
	} // Not supported

	private void closeSocket(Socket socket) {
		try {
			socket.close();
		} catch (IOException e) {
			// Ignore
		}
	}


	// --------------------------------------------------- Acceptor Inner Class

	@Override
	public void bind() throws Exception {

		// Initialize thread count defaults for acceptor
		if (acceptorThreadCount == 0) {
			acceptorThreadCount = 1;
		}
		// Initialize maxConnections
		if (getMaxConnections() == 0) {
			// User hasn't set a value - use the default
			setMaxConnections(getMaxThreadsInternal());
		}

		if (serverSocketFactory == null) {
			if (isSSLEnabled()) {
				serverSocketFactory =
						handler.getSslImplementation().getServerSocketFactory(this);
			} else {
				serverSocketFactory = new DefaultServerSocketFactory(this);
			}
		}

		if (serverSocket == null) {
			try {
				if (getAddress() == null) {
					serverSocket = serverSocketFactory.createSocket(getPort(),
							getBacklog());
				} else {
					serverSocket = serverSocketFactory.createSocket(getPort(),
							getBacklog(), getAddress());
				}
			} catch (BindException orig) {
				String msg;
				if (getAddress() == null)
					msg = orig.getMessage() + " <null>:" + getPort();
				else
					msg = orig.getMessage() + " " +
							getAddress().toString() + ":" + getPort();
				BindException be = new BindException(msg);
				be.initCause(orig);
				throw be;
			}
		}

	}

	@Override
	public void startInternal() throws Exception {

		if (!running) {
			running = true;
			paused = false;

			// Create worker collection
			if (getExecutor() == null) {
				createExecutor();
			}

			initializeConnectionLatch();

			startAcceptorThreads();

			// Start async timeout thread
			Thread timeoutThread = new Thread(new AsyncTimeout(),
					getName() + "-AsyncTimeout");
			timeoutThread.setPriority(threadPriority);
			timeoutThread.setDaemon(true);
			timeoutThread.start();
		}
	}


	// ------------------------------------------- SocketProcessor Inner Class

	@Override
	public void stopInternal() {
		releaseConnectionLatch();
		if (!paused) {
			pause();
		}
		if (running) {
			running = false;
			unlockAccept();
		}
		shutdownExecutor();
	}


	// -------------------- Public methods --------------------

	/**
	 * Deallocate APR memory pools, and close server socket.
	 */
	@Override
	public void unbind() throws Exception {
		if (running) {
			stop();
		}
		if (serverSocket != null) {
			try {
				if (serverSocket != null)
					serverSocket.close();
			} catch (Exception e) {
				log.error(sm.getString("endpoint.err.close"), e);
			}
			serverSocket = null;
		}
		handler.recycle();
	}

	@Override
	protected AbstractEndpoint.Acceptor createAcceptor() {
		return new Acceptor();
	}

	/**
	 * Configure the socket.
	 */
	protected boolean setSocketOptions(Socket socket) {
		try {
			// 1: Set socket options: timeout, linger, etc
			socketProperties.setProperties(socket);
		} catch (SocketException s) {
			//error here is common if the client has reset the connection
			if (log.isDebugEnabled()) {
				log.debug(sm.getString("endpoint.err.unexpected"), s);
			}
			// Close the socket
			return false;
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			log.error(sm.getString("endpoint.err.unexpected"), t);
			// Close the socket
			return false;
		}
		return true;
	}

	/**
	 * Process a new connection from a new client. Wraps the socket so
	 * keep-alive and other attributes can be tracked and then passes the socket
	 * to the executor for processing.
	 *
	 * @param socket The socket associated with the client.
	 * @return <code>true</code> if the socket is passed to the
	 * executor, <code>false</code> if something went wrong or
	 * if the endpoint is shutting down. Returning
	 * <code>false</code> is an indication to close the socket
	 * immediately.
	 */
	protected boolean processSocket(Socket socket) {
		// Process the request from this socket
		try {
			SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
			wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
			wrapper.setSecure(isSSLEnabled());
			// During shutdown, executor may be null - avoid NPE
			if (!running) {
				return false;
			}
			getExecutor().execute(new SocketProcessor(wrapper));
		} catch (RejectedExecutionException x) {
			log.warn("Socket processing request was rejected for:" + socket, x);
			return false;
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			// This means we got an OOM or similar creating a thread, or that
			// the pool and its queue are full
			log.error(sm.getString("endpoint.process.fail"), t);
			return false;
		}
		return true;
	}

	/**
	 * Process an existing async connection. If processing is required, passes
	 * the wrapped socket to an executor for processing.
	 *
	 * @param socket The socket associated with the client.
	 * @param status Only OPEN and TIMEOUT are used. The others are used for
	 *               Comet requests that are not supported by the BIO (JIO)
	 *               Connector.
	 */
	@Override
	public void processSocketAsync(SocketWrapper<Socket> socket,
	                               SocketStatus status) {
		try {
			synchronized (socket) {
				if (waitingRequests.remove(socket)) {
					SocketProcessor proc = new SocketProcessor(socket, status);
					ClassLoader loader = Thread.currentThread().getContextClassLoader();
					try {
						//threads should not be created by the webapp classloader
						if (Constants.IS_SECURITY_ENABLED) {
							PrivilegedAction<Void> pa = new PrivilegedSetTccl(
									getClass().getClassLoader());
							AccessController.doPrivileged(pa);
						} else {
							Thread.currentThread().setContextClassLoader(
									getClass().getClassLoader());
						}
						// During shutdown, executor may be null - avoid NPE
						if (!running) {
							return;
						}
						getExecutor().execute(proc);
						//TODO gotta catch RejectedExecutionException and properly handle it
					} finally {
						if (Constants.IS_SECURITY_ENABLED) {
							PrivilegedAction<Void> pa = new PrivilegedSetTccl(loader);
							AccessController.doPrivileged(pa);
						} else {
							Thread.currentThread().setContextClassLoader(loader);
						}
					}
				}
			}
		} catch (Throwable t) {
			ExceptionUtils.handleThrowable(t);
			// This means we got an OOM or similar creating a thread, or that
			// the pool and its queue are full
			log.error(sm.getString("endpoint.process.fail"), t);
		}
	}

	@Override
	public void removeWaitingRequest(SocketWrapper<Socket> socketWrapper) {
		waitingRequests.remove(socketWrapper);
	}

	@Override
	protected Log getLog() {
		return log;
	}

	/**
	 * Bare bones interface used for socket processing. Per thread data is to be
	 * stored in the ThreadWithAttributes extra folders, or alternately in
	 * thread local fields.
	 */
	public interface Handler extends AbstractEndpoint.Handler {
		public SocketState process(SocketWrapper<Socket> socket,
		                           SocketStatus status);

		public SSLImplementation getSslImplementation();
	}

	/**
	 * Async timeout thread
	 */
	protected class AsyncTimeout implements Runnable {
		/**
		 * The background thread that checks async requests and fires the
		 * timeout if there has been no activity.
		 */
		@Override
		public void run() {

			// Loop until we receive a shutdown command
			while (running) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					// Ignore
				}
				long now = System.currentTimeMillis();
				Iterator<SocketWrapper<Socket>> sockets =
						waitingRequests.iterator();
				while (sockets.hasNext()) {
					SocketWrapper<Socket> socket = sockets.next();
					long access = socket.getLastAccess();
					if (socket.getTimeout() > 0 &&
							(now - access) > socket.getTimeout()) {
						// Prevent multiple timeouts
						socket.setTimeout(-1);
						processSocketAsync(socket, SocketStatus.TIMEOUT);
					}
				}

				// Loop if endpoint is paused
				while (paused && running) {
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						// Ignore
					}
				}

			}
		}
	}

	/**
	 * The background thread that listens for incoming TCP/IP connections and
	 * hands them off to an appropriate processor.
	 */
	protected class Acceptor extends AbstractEndpoint.Acceptor {

		@Override
		public void run() {

			int errorDelay = 0;

			// Loop until we receive a shutdown command
			while (running) {

				// Loop if endpoint is paused
				while (paused && running) {
					state = AcceptorState.PAUSED;
					try {
						Thread.sleep(50);
					} catch (InterruptedException e) {
						// Ignore
					}
				}

				if (!running) {
					break;
				}
				state = AcceptorState.RUNNING;

				try {
					//if we have reached max connections, wait
					countUpOrAwaitConnection();

					Socket socket = null;
					try {
						// Accept the next incoming connection from the server
						// socket
						socket = serverSocketFactory.acceptSocket(serverSocket);
					} catch (IOException ioe) {
						countDownConnection();
						// Introduce delay if necessary
						errorDelay = handleExceptionWithDelay(errorDelay);
						// re-throw
						throw ioe;
					}
					// Successful accept, reset the error delay
					errorDelay = 0;

					// Configure the socket
					if (running && !paused && setSocketOptions(socket)) {
						// Hand this socket off to an appropriate processor
						if (!processSocket(socket)) {
							countDownConnection();
							// Close socket right away
							closeSocket(socket);
						}
					} else {
						countDownConnection();
						// Close socket right away
						closeSocket(socket);
					}
				} catch (IOException x) {
					if (running) {
						log.error(sm.getString("endpoint.accept.fail"), x);
					}
				} catch (NullPointerException npe) {
					if (running) {
						log.error(sm.getString("endpoint.accept.fail"), npe);
					}
				} catch (Throwable t) {
					ExceptionUtils.handleThrowable(t);
					log.error(sm.getString("endpoint.accept.fail"), t);
				}
			}
			state = AcceptorState.ENDED;
		}
	}

	/**
	 * This class is the equivalent of the Worker, but will simply use in an
	 * external Executor thread pool.
	 */
	protected class SocketProcessor implements Runnable {

		protected SocketWrapper<Socket> socket = null;
		protected SocketStatus status = null;

		public SocketProcessor(SocketWrapper<Socket> socket) {
			if (socket == null) throw new NullPointerException();
			this.socket = socket;
		}

		public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
			this(socket);
			this.status = status;
		}

		@Override
		public void run() {
			boolean launch = false;
			synchronized (socket) {
				try {
					SocketState state = SocketState.OPEN;

					try {
						// SSL handshake
						serverSocketFactory.handshake(socket.getSocket());
					} catch (Throwable t) {
						ExceptionUtils.handleThrowable(t);
						if (log.isDebugEnabled()) {
							log.debug(sm.getString("endpoint.err.handshake"), t);
						}
						// Tell to close the socket
						state = SocketState.CLOSED;
					}

					if ((state != SocketState.CLOSED)) {
						if (status == null) {
							state = handler.process(socket, SocketStatus.OPEN_READ);
						} else {
							state = handler.process(socket, status);
						}
					}
					if (state == SocketState.CLOSED) {
						// Close socket
						if (log.isTraceEnabled()) {
							log.trace("Closing socket:" + socket);
						}
						countDownConnection();
						try {
							socket.getSocket().close();
						} catch (IOException e) {
							// Ignore
						}
					} else if (state == SocketState.OPEN ||
							state == SocketState.UPGRADING ||
							state == SocketState.UPGRADING_TOMCAT ||
							state == SocketState.UPGRADED) {
						socket.setKeptAlive(true);
						socket.access();
						launch = true;
					} else if (state == SocketState.LONG) {
						socket.access();
						waitingRequests.add(socket);
					}
				} finally {
					if (launch) {
						try {
							getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
						} catch (RejectedExecutionException x) {
							log.warn("Socket reprocessing request was rejected for:" + socket, x);
							try {
								//unable to handle connection at this time
								handler.process(socket, SocketStatus.DISCONNECT);
							} finally {
								countDownConnection();
							}


						} catch (NullPointerException npe) {
							if (running) {
								log.error(sm.getString("endpoint.launch.fail"),
										npe);
							}
						}
					}
				}
			}
			socket = null;
			// Finish up this request
		}

	}
}
